package com.offcn.bigdata.streaming.p2

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies, PerPartitionConfig}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * SparkStreaming和Kafka的整合
  *
  * spark.streaming.kafka.maxRatePerPartition: 每秒streaming从kafka每隔分区读取的最大记录条数
  * spark.streaming.kafka.minRatePerPartition: 每秒streaming从kafka每隔分区读取的最小记录条数
  * [min, max]
  * spark.streaming.backpressure.enabled: streaming的背压机制，默认为false
  *     如果streaming从kafka读取数据的速率和消费数据的速率不一致，可能会造成数据积压，数据量一多，就会有造成程序崩溃的风险，
  *     所以通过这个背压机制，streaming可以动态的调节从kafka中读取数据的速率，范围就在上述的[min, max]
  *
  *  LocationStrategy:
  *     在kafka0.10之后，是先从partition中读取数据，所以为适当的executor去缓存一个消费者就会显得尤为重要，而不是每次读取数据前都重新创建一个新的consumer。
  *     提供3中Location策略：
  *         PreferBrokers   ： 如果消费kafka数据的executor和kafka的partition分别在相同的节点上面，请使用这种方式。
  *         PreferConsistent： 满足不了PerferBrokers，就是用这种，最常用的一种，就把分区分布给集群中的各个executor。
  *         PreferFixed     ： 如果网络读取数据的性能不均匀，就用特定的host去关联特点的partition进行读取数据，其余的则使用PreferConsistent
  *  consumerStrategy:
  *        使用那么消费的策略去读取kafka中的数据
  *         Subscribe
  *         SubscribePattern
  *         Assign
  */
object _01Streaming2KafkaOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
            .setAppName("_01Streaming2KafkaOps")
            .setMaster("local")
//            .set("spark.streaming.kafka.maxRatePerPartition", "10")
        val batchInterval = Seconds(2)
        val ssc = new StreamingContext(conf, batchInterval)
        //该参数主要配置的就是streaming从kafka中读取数据的速率，每秒每个分区的最大和最小的速率
        val myPerPartitionConfig = new PerPartitionConfig {
            override def maxRatePerPartition(topicPartition: TopicPartition): Long = 10
            override def minRatePerPartition(topicPartition: TopicPartition): Long = 5
        }

        val topics = Set("hadoop")
        val kafkaParams = Map[String, Object](
            "key.deserializer" -> classOf[StringDeserializer],
            "value.deserializer" -> classOf[StringDeserializer],
            "bootstrap.servers" -> "bigdata01:9092,bigdat02:9092,bigdata03:9092",
            "group.id" -> "group-1",
            "auto.offset.reset" -> "earliest",
            "enable.auto.commit" -> "true"
        )
        //从kafka中读取数据
        val messages: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc,
                            LocationStrategies.PreferConsistent,
                            ConsumerStrategies.Subscribe[String, String](topics, kafkaParams),
                            myPerPartitionConfig)

        messages.foreachRDD((rdd, bTime) => {
            if(!rdd.isEmpty()) {
                println("-------------------------------------------")
                println(s"Time: $bTime")
                println("-------------------------------------------")
                rdd.foreach(record => println(record))
            }
        })
        ssc.start
        ssc.awaitTermination()
    }
}
